package com.matrix.cantordemo.userbusiness;

import cn.hutool.core.lang.Console;
import com.cantor.consumer.future.FuturesKeeper;
import com.cantor.consumer.handler.PongHandler;
import com.cantor.consumer.start.ConsumerNettyKeeper;
import com.cantor.core.center.RegistrationCenter;
import com.cantor.core.center.impl.ZooKeeperRegistrationCenter;
import com.cantor.core.handler.CantorMessageCodec;
import com.cantor.core.handler.StickAndHalfPackageDecoder;
import com.cantor.core.message.CantorRequestMessage;
import com.cantor.core.message.CantorResponseMessage;
import com.cantor.core.pool.CantorExecutorPool;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

@Slf4j
public class SimpleTest2 {

    private static EventLoopGroup group;
    private static Bootstrap bootstrap;
    private static Map<String, Channel> chMap = new HashMap<>();

    private RegistrationCenter center;

    @BeforeEach
    public void init() throws InterruptedException {
        center = new ZooKeeperRegistrationCenter().setNamespace("cantor");
        center.run();
        runNettyClient();
    }

    @AfterEach
    public void destroy() {
        center.close();
    }

    // 测试手动向Provider端发送Request请求
    public void runNettyClient() throws InterruptedException {
        // 发送CantorRequestMessage请求
        EventLoopGroup group = new NioEventLoopGroup(1);
        bootstrap = new Bootstrap().group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                .option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(65535))
                .option(ChannelOption.TCP_NODELAY, true) // 直接发包
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        // Duplex
                        // pipeline.addLast(new LoggingHandler()); // 日志
                        pipeline.addLast(new IdleStateHandler(10L, 0, 0, TimeUnit.SECONDS)); // 读空闲处理器
                        pipeline.addLast(new CantorMessageCodec()); // 编解码器
                        // Out

                        // In
                        pipeline.addLast(new StickAndHalfPackageDecoder()); // 半包黏包处理器
                        pipeline.addLast(new PongHandler()); // 心跳包处理器
                        pipeline.addLast(new SimpleChannelInboundHandler<CantorResponseMessage>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, CantorResponseMessage res) {
                                log.info("读到回应" + res.getSequenceId());
                                FuturesKeeper.checkout(res.getSequenceId()).complete(res.getReturnValue());
                            }
                        });
                    }
                });
    }

    Channel getChannel(String host, Integer port) throws InterruptedException {
        String key = host + port;
        if (!chMap.containsKey(key) || chMap.get(key).isActive()) {
            chMap.put(key, bootstrap.connect(host, port)
                    .sync()
                    .channel());
        }
        return chMap.get(key);
    }

    // 测试发送请求
    @Test
    public void testNettyClient() throws InterruptedException {
        ConsumerNettyKeeper.addConnection("192.168.199.109",8101);
        final int nThreads = 100;
        final CountDownLatch latch = new CountDownLatch(nThreads);
        for (int i = 0; i < nThreads; i++) {
            CompletableFuture.runAsync(() -> {
                long sequenceId = 0;
                try {
                    sequenceId = center.getSequenceId();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                long start = System.currentTimeMillis();
                // CompletableFuture future = FuturesKeeper.record(sequenceId, new CompletableFuture());
                CantorRequestMessage requestMessage = CantorRequestMessage.builder()
                        .interfaceName("com.matrix.cantordemo.api.service.ProductRpcService")
                        .serviceVersion("")
                        .methodName("singleCall")
                        .parameterTypes(new Class[]{String.class})
                        .parameterValue(new Object[]{"Message-" + sequenceId})
                        .returnType(String.class)
                        .build();
                requestMessage.setSequenceId(sequenceId);
                long getChannelStart = System.currentTimeMillis();
                CompletableFuture future = ConsumerNettyKeeper.sendCantorRequestMessage("192.168.199.109", 8101, requestMessage);
                System.err.println("获取channel就花费了"+(System.currentTimeMillis()-getChannelStart));
                // ch.writeAndFlush(requestMessage).addListener((ChannelFutureListener) channelFuture -> Console.log("消息{}成功发送", finalSequenceId));
                future.join(); // blocked
                Console.error("{} 耗时: {}", sequenceId, System.currentTimeMillis() - start);
                latch.countDown();
            }, CantorExecutorPool.pool());
        }

        latch.await();
        System.out.println(nThreads + "次请求全部完成");

    }


}
